/**
 * Java RTP Library (jlibrtp)
 * Copyright (C) 2006 Arne Kepp
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 */
package org.jlibrtp;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * This thread hangs on the RTCP socket and waits for new packets
 *
 * @author Arne Kepp
 *
 */
public class RTCPReceiverThread extends Thread {
    /** Logger instance. */
    private static final Logger LOGGER =
        Logger.getLogger(RTCPReceiverThread.class.getName());

    /** Parent RTP Session */
    private RTPSession rtpSession = null;
    /** Parent RTCP Session */
    private RTCPSession rtcpSession = null;

    /**
     * Constructor for new thread
     * @param rtcpSession parent RTCP session
     * @param rtpSession parent RTP session
     */
    RTCPReceiverThread(RTCPSession rtcpSession, RTPSession rtpSession) {
        this.rtpSession = rtpSession;
        this.rtcpSession = rtcpSession;

        if(LOGGER.isLoggable(Level.FINEST)) {
            LOGGER.finest("<-> RTCPReceiverThread created");
        }
        setName("RTCPReceiverThread");
    }

    /**
     * Find out whether a participant with this SSRC is known.
     *
     * If the user is unknown, and the system is operating in unicast mode,
     * try to match the ip-address of the sender to the ip address of a
     * previously unmatched target
     *
     * @param ssrc the SSRC of the participant
     * @param packet the packet that notified us
     * @return the relevant participant, possibly newly created
     */
    private Participant findParticipant(long ssrc, DatagramPacket packet) {
        Participant p = rtpSession.partDb.getParticipant(ssrc);
        if(p == null) {
            Enumeration<Participant> enu = rtpSession.partDb.getParticipants();
            while(enu.hasMoreElements()) {
                Participant tmp = (Participant) enu.nextElement();
                if(tmp.ssrc < 0 &&
                        (tmp.rtcpAddress.getAddress().equals(packet.getAddress())
                                || tmp.rtpAddress.getAddress().equals(packet.getAddress()))) {

                    // Best guess
                    LOGGER.warning("RTCPReceiverThread: Got an unexpected packet from SSRC:"
                            + ssrc  + " @" + packet.getAddress().toString() + ", WAS able to match it." );

                    tmp.ssrc = ssrc;
                    return tmp;
                }
            }
            // Create an unknown sender
            LOGGER.warning("RTCPReceiverThread: Got an unexpected packet from SSRC:"
                    + ssrc  + " @" + packet.getAddress().toString() + ", was NOT able to match it." );
            p = new Participant((InetSocketAddress) null, (InetSocketAddress) packet.getSocketAddress(), ssrc);
            rtpSession.partDb.addParticipant(2,p);
        }
        return p;
    }

    
    private DatagramPacket tryFixingPacket(DatagramPacket packet) {
      byte[] bytes = new byte[packet.getLength() % 4 + 4];
      bytes = Arrays.copyOf(packet.getData(), (packet.getLength() - (packet.getLength() % 4)) + 4);
      packet.setData(bytes);
      packet.setLength(bytes.length);
      return packet;
    }

    /**
     * Parse a received UDP packet
     *
     * Perform the header checks and extract the RTCP packets in it
     *
     * @param packet the packet to be parsed
     * @return -1 if there was a problem, 0 if successfully parsed
     */
    private int parsePacket(DatagramPacket packet) {
    	
    	  if(packet.getLength() % 4 != 0) {
    		    packet = tryFixingPacket(packet);
    	  }

        if(packet.getLength() % 4 != 0) {
            if(LOGGER.isLoggable(Level.FINEST)) {
                LOGGER.finest("RTCPReceiverThread.parsePacket got packet that had length " + packet.getLength());
            }
            return -1;
        } else {
            byte[] rawPkt = packet.getData();

            // Parse the received compound RTCP (?) packet
            CompRtcpPkt compPkt = new CompRtcpPkt(rawPkt, packet.getLength(),
                    (InetSocketAddress) packet.getSocketAddress(), rtpSession);

            if(this.rtpSession.debugAppIntf != null) {
                String intfStr;

                if(rtpSession.mcSession) {
                    intfStr = this.rtcpSession.rtcpMCSock.getLocalSocketAddress().toString();
                } else {
                    intfStr = this.rtpSession.rtpSock.getLocalSocketAddress().toString();
                }

                if( compPkt.problem == 0) {
                    String str = new String("Received compound RTCP packet of size " + packet.getLength() +
                            " from " + packet.getSocketAddress().toString() + " via " + intfStr
                            + " containing " + compPkt.rtcpPkts.size() + " packets" );

                    this.rtpSession.debugAppIntf.packetReceived(1,
                            (InetSocketAddress) packet.getSocketAddress(), str);
                } else {
                    String str = new String("Received invalid RTCP packet of size " + packet.getLength() +
                            " from " + packet.getSocketAddress().toString() + " via " +  intfStr
                            + ": " + this.debugErrorString(compPkt.problem) );

                    this.rtpSession.debugAppIntf.packetReceived(-2,
                            (InetSocketAddress) packet.getSocketAddress(), str);
                }
            }

            if(LOGGER.isLoggable(Level.FINEST)) {
                Iterator<RtcpPkt> iter = compPkt.rtcpPkts.iterator();
                String str = " ";
                while(iter.hasNext()) {
                    RtcpPkt aPkt = iter.next();
                    str += (aPkt.getClass().toString() + ":"+aPkt.itemCount+ ", ");
                }
                LOGGER.finest("<-> RTCPReceiverThread.parsePacket() from " + packet.getSocketAddress().toString() + str);
            }


            //Loop over the information
            Iterator<RtcpPkt> iter = compPkt.rtcpPkts.iterator();

            long curTime = System.currentTimeMillis();

            while(iter.hasNext()) {
                RtcpPkt aPkt = iter.next();

                // Our own packets should already have been filtered out.
                if(aPkt.ssrc == rtpSession.ssrc) {
                    LOGGER.warning("RTCPReceiverThread() received RTCP packet"
                            + " with conflicting SSRC from " + packet.getSocketAddress().toString());
                    rtpSession.resolveSsrcConflict();
                    return -1;
                }

                /**        Receiver Reports        **/
                if(	aPkt.getClass() == RtcpPktRR.class) {
                    RtcpPktRR rrPkt = (RtcpPktRR) aPkt;

                    Participant p = findParticipant(rrPkt.ssrc, packet);
                    p.lastRtcpPkt = curTime;

                    if(rtpSession.rtcpAppIntf != null) {
                        rtpSession.rtcpAppIntf.RRPktReceived(rrPkt.ssrc, rrPkt.reporteeSsrc,
                                rrPkt.lossFraction, rrPkt.lostPktCount, rrPkt.extHighSeqRecv,
                                rrPkt.interArvJitter, rrPkt.timeStampLSR, rrPkt.delaySR);
                    }

                    /**        Sender Reports        **/
                } else if(aPkt.getClass() == RtcpPktSR.class) {
                    RtcpPktSR srPkt = (RtcpPktSR) aPkt;

                    Participant p = findParticipant(srPkt.ssrc, packet);
                    p.lastRtcpPkt = curTime;

                    if(p != null) {

                        if(p.ntpGradient < 0 && p.lastNtpTs1 > -1) {
                            //Calculate gradient NTP vs RTP
                            long newTime = StaticProcs.undoNtpMess(srPkt.ntpTs1, srPkt.ntpTs2);
                            p.ntpGradient = ((double) (newTime - p.ntpOffset))/((double) srPkt.rtpTs - p.lastSRRtpTs);
                            if(LOGGER.isLoggable(Level.FINEST)) {
                                LOGGER.finest("RTCPReceiverThread calculated NTP vs RTP gradient: " + Double.toString(p.ntpGradient));
                            }
                        } else {
                            // Calculate sum of ntpTs1 and ntpTs2 in milliseconds
                            p.ntpOffset = StaticProcs.undoNtpMess(srPkt.ntpTs1, srPkt.ntpTs2);
                            p.lastNtpTs1 = srPkt.ntpTs1;
                            p.lastNtpTs2 = srPkt.ntpTs2;
                            p.lastSRRtpTs = srPkt.rtpTs;
                        }

                        // For the next RR
                        p.timeReceivedLSR = curTime;
                        p.setTimeStampLSR(srPkt.ntpTs1,srPkt.ntpTs2);

                    }


                    if(rtpSession.rtcpAppIntf != null) {
                        if(srPkt.rReports != null) {
                            rtpSession.rtcpAppIntf.SRPktReceived(srPkt.ssrc, srPkt.ntpTs1, srPkt.ntpTs2,
                                    srPkt.rtpTs, srPkt.sendersPktCount, srPkt.sendersOctCount,
                                    srPkt.rReports.reporteeSsrc, srPkt.rReports.lossFraction, srPkt.rReports.lostPktCount,
                                    srPkt.rReports.extHighSeqRecv, srPkt.rReports.interArvJitter, srPkt.rReports.timeStampLSR,
                                    srPkt.rReports.delaySR);
                        } else {
                            rtpSession.rtcpAppIntf.SRPktReceived(srPkt.ssrc, srPkt.ntpTs1, srPkt.ntpTs2,
                                    srPkt.rtpTs, srPkt.sendersPktCount, srPkt.sendersOctCount,
                                    null, null, null,
                                    null, null, null,
                                    null);
                        }
                    }

                    /**        Source Descriptions       **/
                } else if(aPkt.getClass() == RtcpPktSDES.class) {
                    RtcpPktSDES sdesPkt = (RtcpPktSDES) aPkt;

                    if (rtpSession.appIntf != null) {
                        rtpSession.appIntf.userEvent(4, sdesPkt.participants);
                    }

                    // The the participant database is updated
                    // when the SDES packet is reconstructed by CompRtcpPkt
                    if(rtpSession.rtcpAppIntf != null) {
                        rtpSession.rtcpAppIntf.SDESPktReceived(sdesPkt.participants);
                    }

                    /**        Bye Packets       **/
                } else if(aPkt.getClass() == RtcpPktBYE.class) {
                    RtcpPktBYE byePkt = (RtcpPktBYE) aPkt;

                    long time = System.currentTimeMillis();
                    Participant[] partArray = new Participant[byePkt.ssrcArray.length];

                    for(int i=0; i<byePkt.ssrcArray.length; i++) {
                        partArray[i] = rtpSession.partDb.getParticipant(byePkt.ssrcArray[i]);
                        if(partArray[i] != null)
                            partArray[i].timestampBYE = time;
                    }

                    if (rtpSession.appIntf != null) {
                        rtpSession.appIntf.userEvent(1, partArray);
                    }
                    if(rtpSession.rtcpAppIntf != null) {
                        String reason = (byePkt.reason == null ? null : new String(byePkt.reason));
                        rtpSession.rtcpAppIntf.BYEPktReceived(partArray, reason);
                    }

                    /**        Application specific Packets       **/
                } else if(aPkt.getClass() == RtcpPktAPP.class) {
                    RtcpPktAPP appPkt = (RtcpPktAPP) aPkt;

                    Participant part = findParticipant(appPkt.ssrc, packet);

                    if(rtpSession.rtcpAppIntf != null) {
                        rtpSession.rtcpAppIntf.APPPktReceived(part, appPkt.itemCount, appPkt.pktName, appPkt.pktData);
                    }
                }



            }
        }
        return 0;
    }

    /**
     * Returns a legible message when an error occurs
     *
     * @param errorCode the internal error code, commonly negative of packet type
     * @return a string that is hopefully somewhat informative
     */
    private String debugErrorString(int errorCode) {
        String aStr = "";
        switch(errorCode) {
        case -1: aStr = "The first packet was not of type SR or RR."; break;
        case -2: aStr = "The padding bit was set for the first packet."; break;
        case -200: aStr = " Error parsing Sender Report packet."; break;
        case -201: aStr = " Error parsing Receiver Report packet."; break;
        case -202: aStr = " Error parsing SDES packet"; break;
        case -203: aStr = " Error parsing BYE packet."; break;
        case -204: aStr = " Error parsing Application specific packet."; break;
        case -205: aStr = " Error parsing RTP Feedback packet."; break;
        case -206: aStr = " Error parsing Payload-Specific Feedback packet."; break;
        default:
            aStr = "Unknown error code " + errorCode + ".";
        }

        return aStr;
    }

    /**
     * Start the RTCP receiver thread.
     *
     * It will
     * 1) run when it receives a packet
     * 2) parse the packet
     * 3) call any relevant callback functions, update database
     * 4) block until the next one arrives.
     */
    public void run() {
        if(LOGGER.isLoggable(Level.FINEST)) {
            if(rtpSession.mcSession) {
                LOGGER.finest("-> RTCPReceiverThread.run() starting on MC " + rtcpSession.rtcpMCSock.getLocalPort() );
            } else {
                LOGGER.finest("-> RTCPReceiverThread.run() starting on " + rtcpSession.rtcpSock.getLocalPort() );
            }
        }

        if(! rtpSession.mcSession) {
            if (rtcpSession.rtcpSock == null) {
                return;
            }
        } else {
            if (rtcpSession.rtcpMCSock == null) {
                return;
            }
        }
        while(!rtpSession.endSession) {

            if(LOGGER.isLoggable(Level.FINEST)) {
                if(rtpSession.mcSession) {
                    LOGGER.finest("-> RTCPReceiverThread.run() waiting for packet on MC " + rtcpSession.rtcpMCSock.getLocalPort() );
                } else {
                    LOGGER.finest("-> RTCPReceiverThread.run() waiting for packet on " + rtcpSession.rtcpSock.getLocalPort() );
                }
            }

            // Prepare a packet
            byte[] rawPkt = new byte[1500];
            DatagramPacket packet = new DatagramPacket(rawPkt, rawPkt.length);

            // Wait for it to arrive
            if(! rtpSession.mcSession) {
                //Unicast
                try {
                    rtcpSession.rtcpSock.receive(packet);
                } catch (IOException e) {
                    if(!rtpSession.endSession) {
                        e.printStackTrace();
                    } else {
                        continue;
                    }
                }
            } else {
                //Multicast
                try {
                    rtcpSession.rtcpMCSock.receive(packet);
                } catch (IOException e) {
                    if(!rtpSession.endSession) {
                        e.printStackTrace();
                    } else {
                        continue;
                    }
                }
            }

            if (rtpSession.endSession) {
                // Socket may be closed, do not parse any more packets
                break;
            }

            // Check whether this is one of our own
            if( (rtpSession.mcSession && ! packet.getSocketAddress().equals(rtcpSession.rtcpMCSock) )
                    || ! packet.getSocketAddress().equals(rtcpSession.rtcpSock) ) {
                //System.out.println("Packet received from: " + packet.getSocketAddress().toString());
                parsePacket(packet);
                //rtpSession.partDb.debugPrint();
            }
        }

        if(LOGGER.isLoggable(Level.FINEST)) {
            LOGGER.finest("<-> RTCPReceiverThread terminating");
        }
    }

}
